package cn.sheep.violet.report

import cn.sheep.violet.bean.ProCity
import cn.sheep.violet.config.ConfigHandler
import com.alibaba.fastjson.{JSON, JSONObject}
import com.google.gson.Gson
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.SQLContext


/**
  * author: old sheep
  * QQ: 64341393 
  * Created 2018/10/15
  */
object ProCityAnalysisCore {

    def main(args: Array[String]): Unit = {

        val sparkConf = new SparkConf()
        sparkConf.setAppName("省市数据分布统计-core")
        sparkConf.setMaster("local[*]")
        // 设置spark程序采用的序列化方式
        sparkConf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
        val sc = new SparkContext(sparkConf)

        val sQLContext = new SQLContext(sc)

        // 读取数据
        val dataFrame = sQLContext.read.parquet(ConfigHandler.parquetFilePath)

        dataFrame.map(row => {

            val pname = row.getAs[String]("provincename")
            val cname = row.getAs[String]("cityname")
            ((pname, cname), 1)
        }).reduceByKey(_ + _)
            //.map(tp => tp._1._1+"\t"+tp._1._2+"\t"+tp._2)
            .map(tp => {
                val gson = new Gson()
                gson.toJson(ProCity(tp._1._1, tp._1._2, tp._2))
                ProCity(tp._1._1, tp._1._2, tp._2)
            })
            .saveAsTextFile("F:\\violet\\report\\coreJson")


        /**
          * 1. foreachPartition{
          *     conn ....
          *     foreach
          * }
          *
          * RDD -> DataFrame.write.jdbc
          * RDD + schema = DataFrame
          *
          */

        sc.stop()
    }

}
